在 Airflow 當中有提供 PostgresOperator 這個東西,他可以直接對資料庫進行 SQL 指令的執行,不過對於已經用習慣 ORM 方式操作資料庫的人,實在是有點難受,筆者這邊有在網路上找到有大神寫了一個 SQLAlchemyOperator 可以在 Airflow 當中進行 sqlalchemy 的操作與使用,下方為簡易的說明
程式僅用於參考、教學用途,若有問題,請留言告知,非常感謝
附上簡單的目錄架構
運作邏輯:
execute_callable
函式使其在呼叫 python function 的時候,會傳入一個名為 session 的物件"""
ref:https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html
"""
from airflow.operators.python import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook
def get_session(conn_id: str) -> Session:
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()
class SQLAlchemyOperator(PythonOperator):
@apply_defaults
def __init__(
self,
conn_id: str,
*args, **kwargs):
self.conn_id = conn_id
super().__init__(*args, **kwargs)
def execute_callable(self):
session = get_session(self.conn_id)
try:
result = self.python_callable(*self.op_args,
session=session,
**self.op_kwargs)
except Exception:
session.rollback()
raise
session.commit()
return result
備註: Exhibitions 為透過 sqlalchemy 建立的資料表,不清楚的人可以參考 這篇文章
from airflow.decorators import dag
from operators.sqlalchemy_operator import SQLAlchemyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from sqlalchemy.orm import Session
from models.exhibitions import Exhibitions, Base
from sqlalchemy import inspect
class Settings:
conn_id = "postgres_connection"
start_task_id = "start_task"
end_task_id = "end_task"
create_table_task_id = "create_table"
我們可以看到,在參數的部分有準備了一個 session 變數,並且提示為 Session 物件,此函式接到 session 後即會開始執行建立資料表的動作,這部分就不贅述了
def create_table(session: Session):
insp = inspect(session.get_bind())
if not insp.has_table(Exhibitions.__tablename__):
Base.metadata.tables[Exhibitions.__tablename__].create(
session.get_bind())
print("資料表建立成功")
透過使用 SQLAlchemyOperator 建立 operator 在呼叫指定函式時,會自動傳入一個 session 物件,所以在對應的函示當中必須準備一個名為 session 的參數,或是使用 **kwargs
來接,才不會造成錯誤
@dag(start_date=datetime.today(), tags=['user'])
def create_table_dag():
start_task = EmptyOperator(task_id=Settings.start_task_id)
create_table_task = SQLAlchemyOperator(task_id=Settings.create_table_task_id,
python_callable=create_table,
conn_id=Settings.conn_id,
trigger_rule=TriggerRule.ALWAYS)
end_task = EmptyOperator(task_id=Settings.end_task_id,
trigger_rule=TriggerRule.ALWAYS)
start_task >> create_table_task >> end_task
create_create_table_dag = create_table_dag()